{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e08b9f04",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Creator: Joe Tatarka\n",
    "# Last Updated: April 16, 2025\n",
    "# Purpose: Merge together the spend and visits data sets and do some preliminary cleaning \n",
    "# Note: Have to copy spend pull and visits pull from generally useful datasets folder to vulcan scratch space in order to use spark\n",
    "# Spark has to use a distributed file system and so it can't read directly from the project folder\n",
    "# May have to recopy data into scratch, need to check the data to see if it has been modified\n",
    "# Bash commands to copy data into scratch:\n",
    "# cp -TRv /project/fagoolsb/generally_useful_datasets/safegraph_spend_03_19_24_pull /scratch/jtatarka/safegraph_spend_03_19_24_pull\n",
    "#cp -TRv /project/fagoolsb/generally_useful_datasets/safegraph_04_18_23_pull /scratch/rebeccagoldgof/service_industries/safegraph_04_18_23_pull"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "ce9fa26e",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Load in Spark Packages\n",
    "import pyspark\n",
    "import pyspark.pandas as pd\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "from pyspark.sql.functions import from_json, col\n",
    "from pyspark.sql.types import StructType, StructField, StringType \n",
    "from pyspark.sql.functions import lit, coalesce, greatest, regexp_replace\n",
    "\n",
    "from functools import reduce \n",
    "from operator import add \n",
    "from pyspark.sql.types import DoubleType\n",
    "\n",
    "\n",
    "from pyspark.sql.window import Window\n",
    "from pyspark.sql.functions import col, lag, count, dense_rank, desc\n",
    "\n",
    "from pyspark.sql.functions import udf, size\n",
    "from pyspark.sql.types import IntegerType\n",
    "from pyspark.sql.functions import expr, split, substring"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "a827c984",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "25/04/16 09:55:03 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "\n",
       "            <div>\n",
       "                <p><b>SparkSession - hive</b></p>\n",
       "                \n",
       "        <div>\n",
       "            <p><b>SparkContext</b></p>\n",
       "\n",
       "            <p><a href=\"http://hdpen01.chicagobooth.edu:4043\">Spark UI</a></p>\n",
       "\n",
       "            <dl>\n",
       "              <dt>Version</dt>\n",
       "                <dd><code>v3.3.1</code></dd>\n",
       "              <dt>Master</dt>\n",
       "                <dd><code>yarn</code></dd>\n",
       "              <dt>AppName</dt>\n",
       "                <dd><code>PySparkShell</code></dd>\n",
       "            </dl>\n",
       "        </div>\n",
       "        \n",
       "            </div>\n",
       "        "
      ],
      "text/plain": [
       "<pyspark.sql.session.SparkSession at 0x7f6b904da570>"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Start Spark\n",
    "spark = SparkSession.builder.appName(\"jtatarka-1_merge_visits_and_spend\").getOrCreate()\n",
    "spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "82686ca5",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    }
   ],
   "source": [
    "# Load in the Spend Data\n",
    "spend_df = spark.read.option(\"header\", \"true\")\\\n",
    "            .option(\"escape\",\"\\\"\")\\\n",
    "            .option(\"inferSchema\", \"false\")\\\n",
    "            .csv(\"file:///scratch/jtatarka/safegraph_spend_03_19_24_pull/monthly_spend/*.csv.gz\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "4aef80dd",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['PLACEKEY',\n",
       " 'SAFEGRAPH_BRAND_IDS',\n",
       " 'BRANDS',\n",
       " 'SPEND_DATE_RANGE_START',\n",
       " 'SPEND_DATE_RANGE_END',\n",
       " 'RAW_TOTAL_SPEND',\n",
       " 'RAW_NUM_TRANSACTIONS',\n",
       " 'RAW_NUM_CUSTOMERS',\n",
       " 'MEDIAN_SPEND_PER_TRANSACTION',\n",
       " 'MEDIAN_SPEND_PER_CUSTOMER',\n",
       " 'SPEND_PER_TRANSACTION_PERCENTILES',\n",
       " 'SPEND_BY_DAY',\n",
       " 'SPEND_PER_TRANSACTION_BY_DAY',\n",
       " 'SPEND_BY_DAY_OF_WEEK',\n",
       " 'DAY_COUNTS',\n",
       " 'SPEND_PCT_CHANGE_VS_PREV_MONTH',\n",
       " 'SPEND_PCT_CHANGE_VS_PREV_YEAR',\n",
       " 'ONLINE_TRANSACTIONS',\n",
       " 'ONLINE_SPEND',\n",
       " 'TRANSACTION_INTERMEDIARY',\n",
       " 'SPEND_BY_TRANSACTION_INTERMEDIARY',\n",
       " 'BUCKETED_CUSTOMER_FREQUENCY',\n",
       " 'MEAN_SPEND_PER_CUSTOMER_BY_FREQUENCY',\n",
       " 'BUCKETED_CUSTOMER_INCOMES',\n",
       " 'MEAN_SPEND_PER_CUSTOMER_BY_INCOME',\n",
       " 'CUSTOMER_HOME_CITY',\n",
       " 'RELATED_CROSS_SHOPPING_PHYSICAL_BRANDS_PCT',\n",
       " 'RELATED_CROSS_SHOPPING_ONLINE_MERCHANTS_PCT',\n",
       " 'RELATED_CROSS_SHOPPING_SAME_CATEGORY_BRANDS_PCT',\n",
       " 'RELATED_CROSS_SHOPPING_LOCAL_BRANDS_PCT',\n",
       " 'RELATED_WIRELESS_CARRIER_PCT',\n",
       " 'RELATED_STREAMING_CABLE_PCT',\n",
       " 'RELATED_DELIVERY_SERVICE_PCT',\n",
       " 'RELATED_RIDESHARE_SERVICE_PCT',\n",
       " 'RELATED_BUYNOWPAYLATER_SERVICE_PCT',\n",
       " 'RELATED_PAYMENT_PLATFORM_PCT']"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Look at columns\n",
    "spend_df.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "4ffa7a73",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    }
   ],
   "source": [
    "# Load in Visits Data\n",
    "visits_df = spark.read.option(\"header\", \"true\")\\\n",
    "                    .option(\"escape\",\"\\\"\")\\\n",
    "                    .option(\"inferSchema\", \"false\")\\\n",
    "                    .csv(\"file:///scratch/rebeccagoldgof/service_industries/safegraph_04_18_23_pull/20[0-9][0-9]/[0-9][0-9]/01/SAFEGRAPH/MP/*.csv.gz\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "dbd1521d",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['placekey',\n",
       " 'parent_placekey',\n",
       " 'safegraph_brand_ids',\n",
       " 'location_name',\n",
       " 'brands',\n",
       " 'store_id',\n",
       " 'top_category',\n",
       " 'sub_category',\n",
       " 'naics_code',\n",
       " 'latitude',\n",
       " 'longitude',\n",
       " 'street_address',\n",
       " 'city',\n",
       " 'region',\n",
       " 'postal_code',\n",
       " 'open_hours',\n",
       " 'category_tags',\n",
       " 'opened_on',\n",
       " 'closed_on',\n",
       " 'tracking_closed_since',\n",
       " 'websites',\n",
       " 'geometry_type',\n",
       " 'polygon_wkt',\n",
       " 'polygon_class',\n",
       " 'enclosed',\n",
       " 'phone_number',\n",
       " 'is_synthetic',\n",
       " 'includes_parking_lot',\n",
       " 'iso_country_code',\n",
       " 'wkt_area_sq_meters',\n",
       " 'date_range_start',\n",
       " 'date_range_end',\n",
       " 'raw_visit_counts',\n",
       " 'raw_visitor_counts',\n",
       " 'visits_by_day',\n",
       " 'poi_cbg',\n",
       " 'visitor_home_cbgs',\n",
       " 'visitor_home_aggregation',\n",
       " 'visitor_daytime_cbgs',\n",
       " 'visitor_country_of_origin',\n",
       " 'distance_from_home',\n",
       " 'median_dwell',\n",
       " 'bucketed_dwell_times',\n",
       " 'related_same_day_brand',\n",
       " 'related_same_month_brand',\n",
       " 'popularity_by_hour',\n",
       " 'popularity_by_day',\n",
       " 'device_type',\n",
       " 'normalized_visits_by_state_scaling',\n",
       " 'normalized_visits_by_region_naics_visits',\n",
       " 'normalized_visits_by_region_naics_visitors',\n",
       " 'normalized_visits_by_total_visits',\n",
       " 'normalized_visits_by_total_visitors']"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Look at columns in visits data\n",
    "visits_df.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "87f46a27",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Clean Up Spend Data\n",
    "spend_df = spend_df.withColumn(\"year_month_gs\", spend_df[\"spend_date_range_start\"].substr(1,7))\\\n",
    "                    .select(\"placekey\", \"raw_total_spend\", \"year_month_gs\",  \"raw_num_transactions\", \"raw_num_customers\", \"online_transactions\", \"online_spend\", \"transaction_intermediary\", \"spend_by_transaction_intermediary\", \"related_delivery_service_pct\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "f55644d7",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Going to Extract the data from variables stored in json \n",
    "\n",
    "# Intermediate Transaction Schema\n",
    "schema_1 = StructType(\n",
    "    [\n",
    "        StructField('DoorDash', StringType(), True),\n",
    "        StructField('Seamless', StringType(), True),\n",
    "        StructField('Grubhub', StringType(), True),\n",
    "        StructField('Postmates', StringType(), True),\n",
    "        StructField('Favor', StringType(), True),\n",
    "        StructField('Tapingo', StringType(), True)\n",
    "    ]\n",
    ")\n",
    "\n",
    "# Related Delivery Spend Schema\n",
    "schema_2 = StructType(\n",
    "    [\n",
    "        StructField('Uber Eats', StringType(), True),\n",
    "        StructField('DoorDash', StringType(), True),\n",
    "        StructField('Grubhub', StringType(), True),\n",
    "        StructField('Dashpass', StringType(), True),\n",
    "        StructField('Postmates', StringType(), True)\n",
    "    ]\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "205d0ae4",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Add columns to spend for variables in the related delivery schema \n",
    "spend_df = spend_df.withColumn(\"related_delivery_service_pct\", from_json(\"related_delivery_service_pct\", schema_2))\\\n",
    "    .select(\"*\", col('related_delivery_service_pct.*'))\n",
    "\n",
    "# Take the maximum of related delivery service percentage\n",
    "minf = lit(float(\"0\"))\n",
    "rowmax = greatest(*[coalesce(col(x), minf) for x in ['Uber Eats','DoorDash','Grubhub', 'Dashpass', 'Postmates']])\n",
    "spend_df = spend_df.na.fill(0).withColumn(\"max_delivery_pct\", rowmax)\n",
    "\n",
    "# Drop the variables that are no longer needed\n",
    "spend_df = spend_df.drop('Uber Eats', 'DoorDash', 'Grubhub', 'Dashpass', 'Postmates', 'related_delivery_service_pct')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "1cf8f9c3",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Bring in the delivery spend variables from the intermediate transaction schema\n",
    "spend_df = spend_df.withColumn(\"spend_by_transaction_intermediary\", from_json(\"spend_by_transaction_intermediary\", schema_1))\\\n",
    "    .select(\"*\", col('spend_by_transaction_intermediary.*'))\n",
    "\n",
    "# Replace missing values with 0's \n",
    "spend_df = spend_df.fillna({ 'DoorDash':0,'Seamless':0, 'Grubhub':0, 'Postmates':0, 'Favor':0, 'Tapingo':0} )\n",
    "\n",
    "# Add them up\n",
    "spend_df = spend_df.na.fill(0).withColumn(\"delivery_spend\", reduce(add, [col(x) for x in ['DoorDash','Seamless', 'Grubhub', 'Postmates', 'Favor', 'Tapingo']]))\n",
    "\n",
    "# Drop the variables that are no longer needed\n",
    "spend_df = spend_df.drop('DoorDash','Seamless', 'Grubhub', 'Postmates', 'Favor', 'Tapingo', 'spend_by_transaction_intermediary')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "d6b69e29",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Repeat this process one more time with number of transactions \n",
    "spend_df = spend_df.withColumn(\"transaction_intermediary\", from_json(\"transaction_intermediary\", schema_1))\\\n",
    "    .select(\"*\", col('transaction_intermediary.*'))\n",
    "spend_df = spend_df.fillna({ 'DoorDash':0,'Seamless':0, 'Grubhub':0, 'Postmates':0, 'Favor':0, 'Tapingo':0} )\n",
    "# Add them up\n",
    "spend_df = spend_df.na.fill(0).withColumn(\"delivery_transactions\", reduce(add, [col(x) for x in ['DoorDash','Seamless', 'Grubhub', 'Postmates', 'Favor', 'Tapingo']]))\n",
    "\n",
    "# Drop unecessary variables\n",
    "spend_df = spend_df.drop('DoorDash','Seamless', 'Grubhub', 'Postmates', 'Favor', 'Tapingo', 'transaction_intermediary')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "f39a2916",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Time to Clean up the visits data \n",
    "visits_df = visits_df.withColumn(\"year_month_gs\", visits_df[\"date_range_start\"].substr(1,7))\\\n",
    "                    .select(\"placekey\", \"brands\",\"latitude\", \"longitude\", \"postal_code\", \"naics_code\", \"poi_cbg\", \"bucketed_dwell_times\", \"popularity_by_hour\", \"region\", \"year_month_gs\", \"open_hours\", \"opened_on\",\"closed_on\",)\n",
    "\n",
    "# Restrict to Full-Service Restaurants and Limited Service Eating Places\n",
    "restaurant_list = [722511,722513, 722514, 722515]\n",
    "visits_df = visits_df.filter(visits_df.naics_code.isin(restaurant_list)).filter(\"year_month_gs is not null\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "0524c544",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Need to split popularity by hour in 23 different variables \n",
    "\n",
    "# Remove brackets around the popularity_by_hour values so we can use split command\n",
    "visits_df = visits_df.withColumn(\"popularity_by_hour\", regexp_replace(visits_df[\"popularity_by_hour\"], \"[\\\\[\\\\]]\", \"\"))\n",
    "popularity_split = split(visits_df[\"popularity_by_hour\"], ',')\n",
    "\n",
    "# Turn the popularity_by_hour column into 24 columns\n",
    "for i in range(24):\n",
    "    visits_df = visits_df.withColumn(f\"visits_by_hour_{i+1}\", popularity_split.getItem(i))\n",
    "    \n",
    "visits_df = visits_df.drop('popularity_by_hour')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "79efe95e",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Need to split Bucketed dwell times into variables\n",
    "\n",
    "# Create schema for extracting bucket dwell times \n",
    "#schema_3 = spark.read.json(visits_df.rdd.map(lambda row: row.bucketed_dwell_times)).schema\n",
    "\n",
    "# Related Delivery Spend Schema\n",
    "schema_3 = StructType(\n",
    "    [\n",
    "        StructField('<5', StringType(), True),\n",
    "        StructField('5-10', StringType(), True),\n",
    "        StructField('11-20', StringType(), True),\n",
    "        StructField('21-60', StringType(), True),\n",
    "        StructField('61-120', StringType(), True),\n",
    "        StructField('121-240', StringType(), True),\n",
    "        StructField('>240', StringType(), True)\n",
    "    ]\n",
    ")\n",
    "\n",
    "# Create variables for bucketed dwell times\n",
    "#visits_df = visits_df.withColumn(\"bucketed_dwell_times\", from_json(\"bucketed_dwell_times\", schema_3)).select(\"*\", col('bucketed_dwell_times.*'))\n",
    "\n",
    "visits_df = visits_df.withColumn(\"bucketed_dwell_times\", from_json(\"bucketed_dwell_times\", schema_3))\\\n",
    "    .select(\"*\", col('bucketed_dwell_times.*'))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "2960e0c6",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Rename Columns and drop bucketed dwell times\n",
    "visits_df = visits_df.withColumnRenamed(\"<5\", \"dwell_less_5\")\\\n",
    "        .withColumnRenamed(\"5-10\", \"dwell_5_to_10\")\\\n",
    "        .withColumnRenamed(\"11-20\", \"dwell_11_to_20\")\\\n",
    "        .withColumnRenamed(\"21-60\", \"dwell_21_to_60\")\\\n",
    "        .withColumnRenamed(\"61-120\", \"dwell_61_to_120\")\\\n",
    "        .withColumnRenamed(\"121-240\", \"dwell_121_to_240\")\\\n",
    "        .withColumnRenamed(\">240\", \"dwell_240_plus\")\n",
    "\n",
    "visits_df = visits_df.drop('bucketed_dwell_times')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "bb40f8a6",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Time to merge the datasets \n",
    "# Left outer join visits and spend\n",
    "# This gets every single visit observation and then all of the matching spend observations\n",
    "# Equivalent to specifying keep(1 3) in stata merge\n",
    "merge_df = visits_df.join(spend_df, (visits_df[\"placekey\"] == spend_df[\"placekey\"]) & (visits_df[\"year_month_gs\"] == spend_df[\"year_month_gs\"]), \"leftouter\" ).drop(spend_df[\"placekey\"]).drop(spend_df[\"year_month_gs\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "dd550ba5",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    }
   ],
   "source": [
    "# Save merged dataset to hadoop and then move it out from there\n",
    "merge_df.write.format(\"csv\").option(\"header\",True).mode('overwrite').save(\"/user/jtatarka/final_sample_merge\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "983718a7",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Use bash commands in the terminal to copy the data to the project folder\n",
    "# hadoop fs -getmerge /user/jtatarka/final_sample_merge /project/fagoolsb/service_industries/replication_package/datasets/intermediate/1_main_build/7225_visits_and_spend_merge.csv\n",
    "# perl -i -ne 'print if $.==1 || !/^placekey/' /project/fagoolsb/service_industries/replication_package/datasets/intermediate/1_main_build/7225_visits_and_spend_merge.csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "id": "7adf7270",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Export Visits Normalization panel\n",
    "visits_panel_df = spark.read.option(\"header\", \"true\")\\\n",
    "                    .option(\"escape\",\"\\\"\")\\\n",
    "                    .option(\"inferSchema\", \"false\")\\\n",
    "                    .csv(\"file:///scratch/rebeccagoldgof/service_industries/safegraph_04_18_23_pull/20[0-9][0-9]/[0-9][0-9]/01/SAFEGRAPH/MP/normalization_stats.csv\")\n",
    "\n",
    "# Can write this directly to project folder because it is small\n",
    "visits_panel_df.toPandas().to_csv(\"/project/fagoolsb/service_industries/replication_package/datasets/intermediate/1_main_build/visits_normalization_panel.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "41d207f4",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Export Spend Summary Panel \n",
    "spend_panel_df = spark.read.option(\"header\", \"true\")\\\n",
    "                    .option(\"escape\",\"\\\"\")\\\n",
    "                    .option(\"inferSchema\", \"false\")\\\n",
    "                    .csv(\"file:///scratch/jtatarka//safegraph_spend_03_19_24_pull/spend_panel/*.csv\")\n",
    "\n",
    "spend_panel_df.toPandas().to_csv(\"/project/fagoolsb/service_industries/replication_package/datasets/intermediate/1_main_build/spend_03_19_24_summary_panel.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "id": "f50f702e",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Stop using spark \n",
    "spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "da22dd91",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
